package poolserver

import (
	"encoding/binary"
	"errors"
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"

	collector "csudata.com/zqpool/src/collector"
	config "csudata.com/zqpool/src/config"
	"go.uber.org/zap"
)

const DEFAULT_BUF_SIZE int32 = 65536
const MAX_ERROR_MSG_SIZE int32 = 512
const MAX_PREPARE_STMT_SIZE = 65536 * 2 /*最大的Prepare的SQL长度，超过了此长度，会报错 */

type PgFrontConn struct {
	Conn           net.Conn
	Pid            uint32
	BackendKeyData uint32
}

var g_next_pid uint32 = 1000

type PgBackendConn struct {
	Id               int32
	Conn             net.Conn
	State            int32
	nextPrepareId    int32
	needClosePrepare []int32
	ReConnCnt        int32 //重连接次数，每重新连接一次时，此次数加1
	Lock             sync.Mutex
	//poolConf         *config.PoolConfig
	LastFrontId   uint32 //最后使用这个后端的前端ID
	ConnUnixTime  int64  //连接时的unix时间戳
	LifeUsedCount uint32 //使用的次数
	ConnPortal    string //连接串的名称
}

func (p *PgBackendConn) AllocPrepareId() int32 {
	var ret int32
	//FIXME: 是否会溢出？
	ret = atomic.AddInt32(&p.nextPrepareId, 1)
	return ret
}

func (p *PgBackendConn) DeletePrepareId(prepareId int32) {
	p.Lock.Lock()
	defer p.Lock.Unlock()
	p.needClosePrepare = append(p.needClosePrepare, prepareId)
}

type PrepareIdData struct {
	PrepareId      int32
	PrepareRequest []byte
}

type PrepareInBackend struct {
	PrepareId int32
	BackConn  *PgBackendConn
	ReConnCnt int32 //记录着当时完成prepare时后端连接的重连次数，如果后端重新连接之后，后端连接上的重连次数就会加1，这时就与此值不相同的，这样就知道了此连接被重新连接过
}

func (p *PrepareInBackend) DeletePrepare() {
	p.BackConn.DeletePrepareId(p.PrepareId)
}

type PrepareData struct {
	PrepareRequest []byte /*发到后端的prepare数据*/
	BackendMap     map[int32]*PrepareInBackend
}

/* 记录一个前端连接过来的prepare数据 */
type CachedPrepare struct {
	mp map[string]*PrepareData
}

type CuPre struct {
	backendPrepareId   int32
	prepareRequestData []byte
	prepareRequestLen  int32
}

type Pool struct {
	Conf    *config.PoolConfig
	BeConns []*PgBackendConn
}

type ConnContext struct {
	msgList  [][]byte
	msgCount int32
	recvBuf  []byte
	sendBuf  []byte
	sendLen  int32
	//recvLen  int32
	//leftLen  int32
	recvBufSize int32

	cachedPrepare CachedPrepare
	transState    byte /* session的事务状态*/
	pool          *Pool

	isInUnnamedPrepared bool
	unnamePrepareData   []byte

	/*客户端发起的请求中，会有多个P消息和一个S消息，所以需要把多个P消息暂存在cupreMap(Cumulative Prepare)中*/
	cupreMap map[string]*CuPre

	//prepareRequestData []byte
	//prepareRequestLen int32
	//prepareName string
	//backendPrepareId int32

	Pid uint32

	cliConn net.Conn

	isGetBackend  bool
	pBackConn     *PgBackendConn
	pSendBackConn *PgBackendConn
}

func (p *CachedPrepare) Init() {
	p.mp = make(map[string]*PrepareData)
}

func (p *CachedPrepare) Get(prepareName string) (*PrepareData, bool) {
	d, ok := p.mp[prepareName]
	return d, ok
}

func (p *CachedPrepare) AddPrepare(prepareName string, prepareData *PrepareData) {
	p.mp[prepareName] = prepareData
}

func (p *CachedPrepare) DeletePrepare(prepareName string) {
	prepareData, ok := p.mp[prepareName]
	if !ok {
		return
	}

	for _, prepareInBackend := range prepareData.BackendMap {
		prepareInBackend.DeletePrepare()
	}
	delete(p.mp, prepareName)
}

func (p *CachedPrepare) Discard() {
	for _, v := range p.mp {
		for _, prepareInBackend := range v.BackendMap {
			prepareInBackend.DeletePrepare()
		}
	}
	p.mp = nil
}

var g_backend_pool = make(map[string]*Pool, 1)

var g_pools_version = make(map[string]string, 1)

func appendString(buf []byte, str string) int32 {
	n := copy(buf, str)
	buf[n] = 0
	return int32(n + 1)
}

func connectBackend(poolConf *config.PoolConfig) (net.Conn, string, error) {
	var conn net.Conn
	var err error
	var n int32
	var buf []byte
	var recvBuf []byte
	var pos int32
	//var leftLen int32

	buf = make([]byte, 2048)
	recvBuf = make([]byte, 2048)
	var portal = poolConf.GetNextPortal()
	d := net.Dialer{Timeout: 10 * time.Second}
	conn, err = d.Dial("tcp", portal)
	if err != nil {
		zap.S().Infof("Could not connect to %s: %s", portal, err.Error())
		return nil, portal, err
	}

	//pos = 4
	//binary.BigEndian.PutUint16(recvBuf[pos:], 3)
	//pos += 2
	//binary.BigEndian.PutUint32(recvBuf[5:], 0)
	//pos += 2

	pos = 8

	connOpts := map[string]string{
		"user":                poolConf.BeUser,
		"database":            poolConf.BeDbName,
		"client_encoding":     "UTF8",
		"application_name":    "zqpool",
		"password_encryption": "md5",
	}

	for key, value := range connOpts {
		pos += appendString(buf[pos:], key)
		pos += appendString(buf[pos:], value)
	}
	buf[pos] = 0
	pos++
	//填包长度
	binary.BigEndian.PutUint32(buf[0:], uint32(pos))
	//填版本号，为3.0版本
	binary.BigEndian.PutUint16(buf[4:], 3)
	binary.BigEndian.PutUint16(buf[6:], 0)

	_, err = sendData(conn, buf[0:pos])
	if err != nil {
		zap.S().Infof("Send to db error: %s", err.Error())
		conn.Close()
		return nil, portal, err
	}

	n, _, err = recvMessage(conn, recvBuf[0:])
	if err != nil {
		zap.S().Infof("Recv from db error: %s", err.Error())
		conn.Close()
		return nil, portal, err
	}

	if recvBuf[0] != 'R' {
		zap.S().Infof("Expect from db recv 'R' message, but recv %c message: %v", buf[0], string(buf))
		conn.Close()
		return nil, portal, errors.New("receive invalid packet")
	}
	authType := binary.BigEndian.Uint32(recvBuf[5:])
	if authType == 5 { /*type = 5 为md5验证*/

		/* concat('md5', md5(concat(md5(concat(password, username)), random-salt))) */

		md5AuthCalc(buf[5:], poolConf.BeUser, poolConf.BePasswd, recvBuf[9:13])
		buf[0] = 'p'
		buf[40] = 0
		binary.BigEndian.PutUint32(buf[1:], 40)
		n, err = sendData(conn, buf[:41])
		if err != nil {
			zap.S().Infof("Send to PoolConfig error: %s", err.Error())
			conn.Close()
			return nil, portal, err
		}
	} else if authType == 0 { /*type = 0 为trust验证, 代表连接成功不需要做处理*/

	} else {
		zap.S().Infof("Only support md5(type=5) and trust(type=0), can not support type = %d", authType)
		conn.Close()
		return nil, portal, errors.New("only support md5(type=5)")
	}

	for {
		n, _, err = recvMessage(conn, buf[:])
		if err != nil {
			zap.S().Infof("Recv from Db error: %s", err.Error())
			conn.Close()
			return nil, portal, err
		}
		if buf[0] == 'S' {
			//key, value := parseKeyValuePacket(recvBuf[1:])
			parseKeyValuePacket(buf[1:])
			//zap.S().Infof("Recv from PoolConfig parameter: %s=%s", key, value)
		} else if buf[0] == 'E' {
			n = int32(binary.BigEndian.Uint32(buf[1:]))
			zap.S().Infof("Recv from PoolConfig Error: %s", string(buf[5:n-1]))
		} else if buf[0] == 'Z' { /* ReadyForQuery */
			//zap.S().Infof("Recv from PoolConfig ReadyForQuery, trans status is %c", recvBuf[5])
			break
		} else {
			//zap.S().Infof("Unknown message type: %c, msg length: %d", recvBuf[0], n)
		}
	}
	return conn, portal, nil
}

func (p *PgBackendConn) Reconnect(poolConf *config.PoolConfig) string {
	var err error
	var retryCnt = 0
	var portal string

	p.Conn.Close()
	for {
		zap.S().Infof("Reconnect: %v", poolConf.PortalList)
		p.Conn, portal, err = connectBackend(poolConf)
		if err == nil {
			break
		}
		retryCnt++
		zap.S().Infof("Could not connect(%s): %s", portal, err.Error())
		if retryCnt > 0 {
			if retryCnt > 10 {
				time.Sleep(10 * time.Second)
			} else {
				time.Sleep(time.Duration(retryCnt) * time.Second)
			}
		}
	}
	p.ConnUnixTime = time.Now().Unix()
	p.ReConnCnt++
	return portal
}

func (ctx *ConnContext) BeReconnect() string {
	var portal string

	portal = ctx.pBackConn.Reconnect(ctx.pool.Conf)
	return portal
}

/*
清理事务
*/
func CleanupTrans(conn net.Conn) error {
	var err error
	err = sendRollback(conn)
	if err != nil {
		return err
	}

	var buf [1024]byte
	for {
		_, _, err = recvMessage(conn, buf[:])
		if err != nil {
			return err
		}
		if buf[0] == 'Z' { /*ReadyForQuery*/
			return nil
		}
	}
	return nil
}

/*
清除已记录为需要关闭的prepare
*/
func (p *PgBackendConn) CleanupNeedClosePrepare() {
	var err error
	p.Lock.Lock()
	defer p.Lock.Unlock()
	var prepareId int32
	for _, prepareId = range p.needClosePrepare {
		err = sendPqClosePrepare(p.Conn, prepareId)
		if err != nil {
			break
		}
		err = sendPqSync(p.Conn)
		if err != nil {
			break
		}

		var buf [1024]byte
		for {
			_, _, err = recvMessage(p.Conn, buf[:])
			if err != nil {
				zap.S().Infof("Recv from PoolConfig error: %s", err.Error())
				return
			}
			if buf[0] == 'Z' { /*ReadyForQuery*/
				break
			}
			if buf[0] == 'E' {
				zap.S().Infof("Cleanup prepare failed!")
			}
		}
	}
	p.needClosePrepare = nil
}

func getBackend(pool *Pool) *PgBackendConn {
	var cnt = 10
	for {
		for _, backend := range pool.BeConns {
			if atomic.CompareAndSwapInt32(&backend.State, 0, 1) {
				//backend.CleanupNeedClosePrepare()
				backend.LifeUsedCount++
				return backend
			}
		}
		/* 如果没有找到，则等5ms*/
		time.Sleep(time.Duration(cnt+1) * 5 * time.Millisecond)
		collector.IncreaseBackendConnectionLimitReachedTimes(pool.Conf.ID)
		//time.Sleep(time.Millisecond)
		cnt += 1
		if cnt%10 == 0 {
			zap.S().Infof("Wait 10 times for get backend connection!")
		}

		if cnt > 200 {
			cnt = 200
		}
	}
	return nil
}

func (ctx *ConnContext) Init(cliConn net.Conn, msgBufSize int) {
	if msgBufSize == 0 {
		msgBufSize = int(DEFAULT_BUF_SIZE)
	}

	ctx.cliConn = cliConn
	ctx.recvBufSize = int32(msgBufSize)
	ctx.recvBuf = make([]byte, msgBufSize)
	ctx.sendBuf = make([]byte, msgBufSize+1024)
	ctx.Pid = atomic.AddUint32(&g_next_pid, 1)

	ctx.msgList = make([][]byte, 512)

	ctx.isGetBackend = false
	ctx.pBackConn = nil
	ctx.isInUnnamedPrepared = false
	ctx.transState = 'I'

	//ctx.prepareRequestLen = 0

	ctx.cupreMap = make(map[string]*CuPre)
}

func noused(n int32) int32 {
	return n
}

func checkPool() {
	var pool *Pool
	var backConn *PgBackendConn
	for {
		time.Sleep(10 * time.Second)
		for _, pool = range g_backend_pool {
			for _, backConn = range pool.BeConns {
				if atomic.CompareAndSwapInt32(&backConn.State, 0, 1) {
					zap.S().Debugf("checkPool: get backend(%d)", backConn.Id)
					backConn.CleanupNeedClosePrepare()
					if pool.Conf.BeConnLifeTime > 0 && backConn.LifeUsedCount > 0 {
						if time.Now().Unix()-backConn.ConnUnixTime > int64(pool.Conf.BeConnLifeTime) {
							portal := backConn.Reconnect(pool.Conf)
							zap.S().Infof("backConn(%d) life cycle has ended, release and reconnect %s", backConn.Id, portal)
							backConn.LifeUsedCount = 0
						}
					}
					atomic.StoreInt32(&backConn.State, 0)
					zap.S().Debugf("checkPool: release backend(%d)", backConn.Id)
				}
			}
		}
	}
}

func CollectBackendConnections() {
	var pool *Pool
	for _, pool = range g_backend_pool {
		collector.UpdateBackendConnections(pool.Conf.ID, float64(len(pool.BeConns)))
	}
}

func CollectActiveBackendConnections() {
	var pool *Pool
	var backendConn *PgBackendConn
	var activeBackendConnNum float64 = 0
	for _, pool = range g_backend_pool {
		for _, backendConn = range pool.BeConns {
			stateValue := atomic.LoadInt32(&backendConn.State)
			if stateValue == 1 {
				activeBackendConnNum += 1
			}
		}
		collector.UpdateAcitveBackendConnections(pool.Conf.ID, activeBackendConnNum)
		activeBackendConnNum = 0
	}
}

func PoolReleaseBeDb(poolName string, portal string) error {
	var pool *Pool
	var backConn *PgBackendConn
	pool, ok := g_backend_pool[poolName]
	if !ok {
		zap.S().Infof("pool(%s) not exists, pools is %v ", poolName, g_backend_pool)
		return errors.New(fmt.Sprintf("pool(%s) not exists, pools is %v ", poolName, g_backend_pool))
	}

	for _, backConn = range pool.BeConns {
		if backConn.ConnPortal != portal {
			continue
		}
		if atomic.CompareAndSwapInt32(&backConn.State, 0, 1) {
			backConn.CleanupNeedClosePrepare()
			if pool.Conf.BeConnLifeTime > 0 && backConn.LifeUsedCount > 0 {
				if time.Now().Unix()-backConn.ConnUnixTime > int64(pool.Conf.BeConnLifeTime) {
					zap.S().Infof("backConn(%d) has released", backConn.Id)
					backConn.Reconnect(pool.Conf)
					backConn.LifeUsedCount = 0
				}
			}
			backConn.State = 0
		}
	}
	return nil
}

func StartServer(listenAddr string) {
	var i int
	var poolConf *config.PoolConfig
	var pool *Pool
	var poolSize int
	var poolName string
	var serverVersion string

	poolsConf := config.GetAllPoolsConfig()
	for _, poolConf = range poolsConf {
		poolName = poolConf.FeUser + "." + poolConf.FeDbName
		poolSize = poolConf.BeConns
		pool = new(Pool)
		pool.BeConns = make([]*PgBackendConn, poolSize)
		pool.Conf = poolConf
		g_backend_pool[poolName] = pool

		for i = 0; i < poolSize; i++ {
			conn, portal, err := connectBackend(poolConf)
			if err != nil {
				return
			}
			pBackend := new(PgBackendConn)
			pBackend.Id = int32(i + 1)
			pBackend.Conn = conn
			pBackend.State = 0
			pBackend.nextPrepareId = 0
			pBackend.ReConnCnt = 0
			//pBackend.poolConf = poolConf
			pBackend.ConnPortal = portal
			pool.BeConns[i] = pBackend
			if i == 0 {
				err, serverVersion = getDbServerVersion(conn)
				g_pools_version[poolName] = serverVersion
				zap.S().Infof("Pool(%s) ServerVersion=%s", poolName, serverVersion)
			}
		}
	}

	go checkPool()

	zap.S().Infof("Starting server on %s ...", listenAddr)
	addr, _ := net.ResolveTCPAddr("tcp", listenAddr)

	listener, err := net.ListenTCP("tcp", addr)
	if err != nil {
		zap.S().Errorf("Could not listen on %s: %s", listenAddr, err.Error())
		return
	}

	ticker := time.NewTicker(10 * time.Millisecond)

	go func() {
		for {
			select {
			case <-ticker.C:
				CollectBackendConnections()
				CollectActiveBackendConnections()
			}
		}
	}()

	for {
		con, err := listener.Accept()
		if err != nil {
			zap.S().Error("Failed to accepting a connection: ", err)
		}
		go handleConnection(con)
	}
}
